package untils;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;

public class SensonSource implements SourceFunction<SensonReading> {

    private boolean running = true;
    private Random random = new Random();
    @Override
    public void run(SourceContext<SensonReading> sourceContext) throws Exception {
        while (running) {
            for (int i = 0; i < 4; i++) {
                sourceContext.collect(
                        new SensonReading(
                                "season_" + i,
                                random.nextInt(10000)
                        )
                );
            }
            Thread.sleep(300L);
        }
    }

    @Override
    public void cancel() {
        running = false;

    }
}
